package com.zhentao.listener;

import com.alibaba.fastjson.JSON;
import com.zhentao.domain.Product;
import com.zhentao.service.ProductService;
import lombok.SneakyThrows;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class ProductListener {
    @Autowired
    private ProductService service;
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    @SneakyThrows
    @KafkaListener(topics = "productTopic")
    public void productListener(String msg){
        if (msg!=null){
            Product product = JSON.parseObject(msg, Product.class);
            System.out.println("kafka处理数据"+product);
            product.setState(1);
            service.updateById(product);
            System.out.println(product+"该商品已经成功上架");
        }
    }
}
